Aktuell funktioniert "Teil B: Verschlüsseltes Ansammeln" in diesem Notebook nicht, weil Numpy noch nicht eingebunden ist.

Der aktuelle Stand kann hier eingesehen werden: https://github.com/OpenMined/PySyft/issues/2771.

Diese Mitteilung wird entfernt, sobald Teil B wieder funktioniert.

Teil 10: Federated Learning mit dem Sammeln verschlüsselter Gradienten

In den letzten Abschnitten wurde verschlüsseltes Berechnen mit diversen kleinen Programmen vorgestellt. In diesem Teil wird erneut das Federated Learning Beispiel aus Teil 4 überarbeitet. Darin ging es darum die verbesserten Modele der einzelnen Helfer auf einem "vertrauenswürdigen Helfer" zu sammeln und zu mitteln.

Hier werden nun die neuen Werkzeuge fürs verschlüsselte Berechnen genutzt, um diesen vertrauenswürdigen Helfer zu ersetzen, denn nicht in jedem Szenario können die sensiblen Informationen einer weiteren Instanz anvertraut werden.

In diesem Notebook wird gezeigt, wie mit SMPC ein sicheres Sammeln umzusetzen ist, um somit den "vertrauenswürdigen Helfer" nicht einbinden zu müssen.

Autoren:

Übersetzer:

Abschnitt 1: Normales Federated Learning

Hierbei handelt es sich um klassisches Federated Learning auf dem "Boston Housing"-Datensatz. Der Abschnitt wird aufgeteilt in mehrere kleinere Sektionen.

Einrichten des Programms


In [ ]:
import pickle

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

class Parser:
    """Parameters for training"""
    def __init__(self):
        self.epochs = 10
        self.lr = 0.001
        self.test_batch_size = 8
        self.batch_size = 8
        self.log_interval = 10
        self.seed = 1
    
args = Parser()

torch.manual_seed(args.seed)
kwargs = {}

Laden des Datensatzes


In [ ]:
with open('../data/BostonHousing/boston_housing.pickle','rb') as f:
    ((X, y), (X_test, y_test)) = pickle.load(f)

X = torch.from_numpy(X).float()
y = torch.from_numpy(y).float()
X_test = torch.from_numpy(X_test).float()
y_test = torch.from_numpy(y_test).float()
# preprocessing
mean = X.mean(0, keepdim=True)
dev = X.std(0, keepdim=True)
mean[:, 3] = 0. # the feature at column 3 is binary,
dev[:, 3] = 1.  # so we don't standardize it
X = (X - mean) / dev
X_test = (X_test - mean) / dev
train = TensorDataset(X, y)
test = TensorDataset(X_test, y_test)
train_loader = DataLoader(train, batch_size=args.batch_size, shuffle=True, **kwargs)
test_loader = DataLoader(test, batch_size=args.test_batch_size, shuffle=True, **kwargs)

Struktur des Neuralen Netzwerkes


In [ ]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(13, 32)
        self.fc2 = nn.Linear(32, 24)
        self.fc3 = nn.Linear(24, 1)

    def forward(self, x):
        x = x.view(-1, 13)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

model = Net()
optimizer = optim.SGD(model.parameters(), lr=args.lr)

Anbinden an PyTorch


In [ ]:
import syft as sy

hook = sy.TorchHook(torch)
bob = sy.VirtualWorker(hook, id="bob")
alice = sy.VirtualWorker(hook, id="alice")
james = sy.VirtualWorker(hook, id="james")

compute_nodes = [bob, alice]

Senden der Daten an die Helfer
Normalerweise befinden sich die Daten schon auf dem Helfer, nur aus demonstrationszwecken wird es hier manuell gesendet.


In [ ]:
train_distributed_dataset = []

for batch_idx, (data,target) in enumerate(train_loader):
    data = data.send(compute_nodes[batch_idx % len(compute_nodes)])
    target = target.send(compute_nodes[batch_idx % len(compute_nodes)])
    train_distributed_dataset.append((data, target))

Trainings Funktion


In [ ]:
def train(epoch):
    model.train()
    for batch_idx, (data,target) in enumerate(train_distributed_dataset):
        worker = data.location
        model.send(worker)

        optimizer.zero_grad()
        # update the model
        pred = model(data)
        loss = F.mse_loss(pred.view(-1), target)
        loss.backward()
        optimizer.step()
        model.get()
            
        if batch_idx % args.log_interval == 0:
            loss = loss.get()
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * data.shape[0], len(train_loader),
                       100. * batch_idx / len(train_loader), loss.item()))

Test Funktion


In [ ]:
def test():
    model.eval()
    test_loss = 0
    for data, target in test_loader:
        output = model(data)
        test_loss += F.mse_loss(output.view(-1), target, reduction='sum').item() # sum up batch loss
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        
    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}\n'.format(test_loss))

Training des Models


In [ ]:
import time

In [ ]:
t = time.time()

for epoch in range(1, args.epochs + 1):
    train(epoch)

    
total_time = time.time() - t
print('Total', round(total_time, 2), 's')

Berechnen der Performance


In [ ]:
test()

Abschnitt 2: Hinzufügen vom verschlüsselten Ansammeln

Nun wird das Beispiel leicht modifiziert, um die Gradienten auf verschlüsselte Weise anzusammeln. Hauptsächlich werden dafür nur ein oder zwei Zeilen der train()-Funktion angepasst. Der Datensatz wird neu vorverarbeitet und das Model für Bob und Alice neu initialisiert.


In [ ]:
remote_dataset = (list(),list())

train_distributed_dataset = []

for batch_idx, (data,target) in enumerate(train_loader):
    data = data.send(compute_nodes[batch_idx % len(compute_nodes)])
    target = target.send(compute_nodes[batch_idx % len(compute_nodes)])
    remote_dataset[batch_idx % len(compute_nodes)].append((data, target))

def update(data, target, model, optimizer):
    model.send(data.location)
    optimizer.zero_grad()
    pred = model(data)
    loss = F.mse_loss(pred.view(-1), target)
    loss.backward()
    optimizer.step()
    return model

bobs_model = Net()
alices_model = Net()

bobs_optimizer = optim.SGD(bobs_model.parameters(), lr=args.lr)
alices_optimizer = optim.SGD(alices_model.parameters(), lr=args.lr)

models = [bobs_model, alices_model]
params = [list(bobs_model.parameters()), list(alices_model.parameters())]
optimizers = [bobs_optimizer, alices_optimizer]

Aufbauen der Trainingslogik

Der einzige echte Unterschied ist in der Trainings-Methode zu finden. Dies wird Schritt-für-Schritt beleuchtet.

Teil A: Training:


In [ ]:
# this is selecting which batch to train on
data_index = 0
# update remote models
# we could iterate this multiple times before proceeding, but we're only iterating once per worker here
for remote_index in range(len(compute_nodes)):
    data, target = remote_dataset[remote_index][data_index]
    models[remote_index] = update(data, target, models[remote_index], optimizers[remote_index])

Teil B: Verschlüsseltes Ansammeln


In [ ]:
# create a list where we'll deposit our encrypted model average
new_params = list()

In [ ]:
# iterate through each parameter
for param_i in range(len(params[0])):

    # for each worker
    spdz_params = list()
    for remote_index in range(len(compute_nodes)):
        
        # select the identical parameter from each worker and copy it
        copy_of_parameter = params[remote_index][param_i].copy()
        
        # since SMPC can only work with integers (not floats), we need
        # to use Integers to store decimal information. In other words,
        # we need to use "Fixed Precision" encoding.
        fixed_precision_param = copy_of_parameter.fix_precision()
        
        # now we encrypt it on the remote machine. Note that 
        # fixed_precision_param is ALREADY a pointer. Thus, when
        # we call share, it actually encrypts the data that the
        # data is pointing TO. This returns a POINTER to the 
        # MPC secret shared object, which we need to fetch.
        encrypted_param = fixed_precision_param.share(bob, alice, crypto_provider=james)
        
        # now we fetch the pointer to the MPC shared value
        param = encrypted_param.get()
        
        # save the parameter so we can average it with the same parameter
        # from the other workers
        spdz_params.append(param)

    # average params from multiple workers, fetch them to the local machine
    # decrypt and decode (from fixed precision) back into a floating point number
    new_param = (spdz_params[0] + spdz_params[1]).get().float_precision()/2
    
    # save the new averaged parameter
    new_params.append(new_param)

Teil C: Aufräumen


In [ ]:
with torch.no_grad():
    for model in params:
        for param in model:
            param *= 0

    for model in models:
        model.get()

    for remote_index in range(len(compute_nodes)):
        for param_index in range(len(params[remote_index])):
            params[remote_index][param_index].set_(new_params[param_index])

Zusammenführen aller Teile!!

Da nun jeder der Schritte vorgestellt wurde, werden sie jetzt in einer Trainings-Schleife vereint!


In [ ]:
def train(epoch):
    for data_index in range(len(remote_dataset[0])-1):
        # update remote models
        for remote_index in range(len(compute_nodes)):
            data, target = remote_dataset[remote_index][data_index]
            models[remote_index] = update(data, target, models[remote_index], optimizers[remote_index])

        # encrypted aggregation
        new_params = list()
        for param_i in range(len(params[0])):
            spdz_params = list()
            for remote_index in range(len(compute_nodes)):
                spdz_params.append(params[remote_index][param_i].copy().fix_precision().share(bob, alice, crypto_provider=james).get())

            new_param = (spdz_params[0] + spdz_params[1]).get().float_precision()/2
            new_params.append(new_param)

        # cleanup
        with torch.no_grad():
            for model in params:
                for param in model:
                    param *= 0

            for model in models:
                model.get()

            for remote_index in range(len(compute_nodes)):
                for param_index in range(len(params[remote_index])):
                    params[remote_index][param_index].set_(new_params[param_index])

In [ ]:
def test():
    models[0].eval()
    test_loss = 0
    for data, target in test_loader:
        output = models[0](data)
        test_loss += F.mse_loss(output.view(-1), target, reduction='sum').item() # sum up batch loss
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        
    test_loss /= len(test_loader.dataset)
    print('Test set: Average loss: {:.4f}\n'.format(test_loss))

In [ ]:
t = time.time()

for epoch in range(args.epochs):
    print(f"Epoch {epoch + 1}")
    train(epoch)
    test()

    
total_time = time.time() - t
print('Total', round(total_time, 2), 's')

Herzlichen Glückwunsch!!! - Zeit, der Community beizutreten!

Herzlichen Glückwunsch zum Abschluss dieses Notebook-Tutorials! Wenn es Ihnen gefallen hat und Sie sich der Bewegung zur Wahrung der Privatsphäre, zum dezentralisiertenen Besitz von KI und der KI-Lieferkette (Daten) anschließen möchten, können Sie dies auf folgende Weise tun!

PySyft auf GitHub einen Stern geben!

Der einfachste Weg, unserer Community zu helfen, besteht darin, die GitHub-Repos mit Sternen auszuzeichnen! Dies hilft, das Bewusstsein für die coolen Tools zu schärfen, die wir bauen.

Mach mit bei Slack!

Der beste Weg, um über die neuesten Entwicklungen auf dem Laufenden zu bleiben, ist, sich unserer Community anzuschließen! Sie können dies tun, indem Sie das Formular unter http://slack.openmined.org ausfüllen.

Treten Sie einem Code-Projekt bei!

Der beste Weg, um zu unserer Community beizutragen, besteht darin, Entwickler zu werden! Sie können jederzeit zur PySyft GitHub Issues-Seite gehen und nach "Projects" filtern. Dies zeigt Ihnen alle Top-Level-Tickets und gibt einen Überblick darüber, an welchen Projekten Sie teilnehmen können! Wenn Sie nicht an einem Projekt teilnehmen möchten, aber ein wenig programmieren möchten, können Sie auch nach weiteren "einmaligen" Miniprojekten suchen, indem Sie nach GitHub-Problemen suchen, die als "good first issue" gekennzeichnet sind.

Spenden

Wenn Sie keine Zeit haben, zu unserer Codebase beizutragen, aber dennoch Unterstützung leisten möchten, können Sie auch Unterstützer unseres Open Collective werden. Alle Spenden fließen in unser Webhosting und andere Community-Ausgaben wie Hackathons und Meetups!


In [ ]: